package rabbitmq

import (
	"context"
	"gitee.com/lipore/plume/mq"
	"github.com/pkg/errors"
	"github.com/rabbitmq/amqp091-go"
	"sync"
)

type Message struct {
	amqp091.Delivery
}

func (m Message) Body() []byte {
	return m.Delivery.Body
}

func (m Message) Ack() error {
	return m.Acknowledger.Ack(m.DeliveryTag, false)
}

func (m Message) NoAck() error {
	return m.Acknowledger.Nack(m.DeliveryTag, false, true)
}

func (m Message) ID() string {
	return m.MessageId
}

type Subscriber struct {
	ctx       context.Context
	cancel    context.CancelFunc
	channel   *amqp091.Channel
	onMessage func(message mq.Message) error
}

func (s *Subscriber) shouldListenAndRun(queue string) error {
	msgs, err := s.channel.ConsumeWithContext(s.ctx, queue, "", false, false, false, false, nil)
	if err != nil {
		return errors.WithMessage(err, "failed to consume message")
	}
	go func() {
		for msg := range msgs {
			err := s.onMessage(Message{Delivery: msg})
			if err != nil {
				panic(errors.WithMessage(err, "failed to process message"))
			}
		}
	}()
	return nil
}

func (s *Subscriber) OnMessage(message mq.Message) error {
	return s.onMessage(message)
}

func NewSubscriberRegister(channel *amqp091.Channel) mq.SubscriberRegister {
	return &SubscriberRegister{
		channel:       channel,
		subscriberMap: make(map[string]*Subscriber),
	}
}

type SubscriberRegister struct {
	sync.Mutex
	subscriberMap map[string]*Subscriber
	channel       *amqp091.Channel
}

func (s *SubscriberRegister) Subscribe(ctx context.Context, subscription string, onMessage func(message mq.Message) error) error {
	c, cancel := context.WithCancel(ctx)
	s.Lock()
	defer s.Unlock()
	if s.subscriberMap == nil {
		s.subscriberMap = make(map[string]*Subscriber)
	}
	if sub, exist := s.subscriberMap[subscription]; exist {
		sub.cancel()
	}
	subscriber := &Subscriber{
		ctx:       c,
		cancel:    cancel,
		onMessage: onMessage,
		channel:   s.channel,
	}
	s.subscriberMap[subscription] = subscriber
	return subscriber.shouldListenAndRun(subscription)
}
